package com.clx.risk.flink.demo.sources;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class JsonStringSourceFunction implements SourceFunction<String> {
    private boolean running = true;

    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        Random random = new Random();
        int i = 0;
        while (running) {
            i++;
            long l = System.currentTimeMillis();
            JSONObject jsonObject = new JSONObject();

            jsonObject.put("id", i);
            jsonObject.put("transJnlNo", l);
            jsonObject.put("productCode", "zhongyioali_" + (i % 3));
            jsonObject.put("key", "" + random.nextInt(10));
            jsonObject.put("status", "" + (i % 10));
            jsonObject.put("dateTime", l);
            ctx.collect(jsonObject.toJSONString());
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}